package cn.kwq.pcsystem.mq;

import cn.kwq.pcsystem.utils.RedisUtils;
import com.fasterxml.jackson.core.JsonProcessingException;
import lombok.extern.slf4j.Slf4j;

import java.util.function.Consumer;

/**
 * Created with IntelliJ IDEA.
 * redis作为mq的服务类
 * @Author: kwq
 * @Date: 2023/03/06/16:45
 * @Description:
 */
@Slf4j
public class RedisMQService {
    //队列操作最大重试次数
    private static final int MAX_TRY_TIMES=2;

    /**
     * 入队
     * @param channel 管道名字
     * @param value 值
     * @param <T> 插入类型
     */
    public <T> void push(String channel, T value){
        if (value instanceof String ){
           //单纯字符串情况下
            RedisUtils.leftPush(channel,(String) value);
        }else {
            RedisUtils.leftPushObj(channel,value);
        }
    }

    /**
     * 出队操作，对象
     * @param channel 管道名字
     * @param consumer 消费操作
     */
    public void popObj(String channel, Consumer<Object> consumer){
        int tryTimes=1;
        Object value = RedisUtils.rightPopObj(channel);
        while (tryTimes<=MAX_TRY_TIMES){
            try{
                consumer.accept(value);
                break;
            }catch (Exception e){
                log.error("{}消费失败，即将开始第{}次重试",channel,tryTimes);
                RedisUtils.rightPushObj(channel,value);
                tryTimes++;
                e.printStackTrace();
            }
        }
        if (tryTimes>2){
            log.error("{}重试超过上限入队尾",channel);
            RedisUtils.leftPushObj(channel,value);
        }
    }

    /**
     * 出队操作，str
     * @param channel 管道名字
     * @param consumer 消费操作
     */
    public void popStr(String channel, Consumer<String> consumer){
        int tryTimes=1;
        String value = RedisUtils.rightPop(channel);
        while (tryTimes<=MAX_TRY_TIMES){
            try{
                consumer.accept(value);
                break;
            }catch (Exception e){
                    log.error("{}消费失败，即将开始第{}次重试",channel,tryTimes);
                    RedisUtils.rightPush(channel,value);
                    tryTimes++;
                e.printStackTrace();
                }
            }
        if (tryTimes>2){
            log.error("{}重试超过上限入队尾",channel);
            RedisUtils.leftPush(channel,value);
            }
        }

    /**
     * 发布消息
     * @param channel 管道名字
     * @param msg 信息
     * @param <T> 类型
     * @throws JsonProcessingException 序列化错误
     */
    public <T> void publish(String channel,T msg) throws JsonProcessingException {
            if (msg instanceof String ){
                //单纯字符串情况下
                RedisUtils.publishStr(channel,(String) msg);
            }else {
                RedisUtils.publishObj(channel,msg);
            }
        }


}

